package com.hao.sql.udf;

import com.hao.bean.SensorReading;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

public class UDF_Table_TableFunction {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism( 1 );
        //指定事件时间
        env.setStreamTimeCharacteristic( TimeCharacteristic.EventTime );

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create( env );

        // 2. 读入文件数据，得到DataStream
        DataStream<String> inputStream = env.readTextFile( "E:\\Project\\FlinkTutorials\\Flink-Scala\\src\\main\\resources\\sensor.txt" );

        // 3. 转换成POJO
        DataStream<SensorReading> dataStream = inputStream.map( line -> {
            String[] fields = line.split( "," );
            return new SensorReading( fields[0], new Long( fields[1] ), new Double( fields[2] ) );
        } );


        // 4. 将流转换成表，定义时间特性
        Table sensorTable = tableEnv.fromDataStream( dataStream, "id, timestamp as ts, temperature as temp" );

        // 创建一个表聚合函数实例
        Top2Temp top2Temp = new Top2Temp();
        tableEnv.registerFunction( "top2Temp", top2Temp );
        Table resultTable = sensorTable
                .where( "id = 'sensor_1'" )
                .groupBy( "id" )
                .flatAggregate( "top2Temp(temp) as (temp, rank)" )
                .select( "id, temp, rank" );


        tableEnv.toRetractStream( resultTable, Row.class ).print( "result" );

        env.execute( "Job" );
    }

    // 先定义一个 Accumulator
    public static class Top2TempAcc {
        double highestTemp = Double.MIN_VALUE;
        double secondHighestTemp = Double.MIN_VALUE;
    }

    // 自定义表聚合函数
    public static class Top2Temp extends TableAggregateFunction<Tuple2<Double,
            Integer>, Top2TempAcc> {
        @Override
        public Top2TempAcc createAccumulator() {
            return new Top2TempAcc();
        }

        // 实现计算聚合结果的函数 accumulate
        public void accumulate(Top2TempAcc acc, Double temp) {
            if (temp > acc.highestTemp) {
                acc.secondHighestTemp = acc.highestTemp;
                acc.highestTemp = temp;
            } else if (temp > acc.secondHighestTemp) {
                acc.secondHighestTemp = temp;
            }
        }

        // 实现一个输出结果的方法，最终处理完表中所有数据时调用
        public void emitValue(Top2TempAcc acc, Collector<Tuple2<Double, Integer>>
                out) {
            out.collect( new Tuple2<>( acc.highestTemp, 1 ) );
            out.collect( new Tuple2<>( acc.secondHighestTemp, 2 ) );
        }
    }
}